package cn.uncode.dal.utils;

import cn.uncode.dal.asyn.AsynContext;
import cn.uncode.dal.asyn.Method;
import cn.uncode.dal.asyn.ShardsBatchCallable;
import cn.uncode.dal.asyn.ShardsQueryCallable;
import cn.uncode.dal.core.BaseDAL;
import cn.uncode.dal.core.BaseDTO;
import cn.uncode.dal.criteria.QueryCriteria;
import cn.uncode.dal.datasource.DBContextHolder;
import cn.uncode.dal.descriptor.QueryResult;
import cn.uncode.dal.router.TableShardingRouter;
import org.apache.commons.lang3.StringUtils;

import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;

public class ShardsUtils {
	
	private static final int CALLABLE_FUTURE_TIMEOUT_SECOND = 1000;
	
	/**
	 * 对分表后的查询结果进行重排
	 * @param result 结果
	 * @param queryCriteria 查询条件
	 * @return 排序后的结果
	 */
	public static List<Map<String, Object>> complieResult(List<Map<String, Object>> result, final QueryCriteria queryCriteria) {
		List<Map<String, Object>> resultList = new ArrayList<Map<String, Object>>();
		
			if (StringUtils.isNotEmpty(queryCriteria.getOrderByClause())) {
				final String[] fds = queryCriteria.getOrderByClause().toLowerCase().split(",");
				LinkedHashMap<String, String> orderFd = new LinkedHashMap<String, String>();
				if(fds != null && fds.length > 0){
					for(String fd:fds){
						if(fd.indexOf("desc") != -1){
							String val = fd.replaceAll("desc", "");
							orderFd.put(val.trim(), "desc");
						}else{
							String val = fd.replaceAll("asc", "");
							orderFd.put(val.trim(), "asc");
						}
					}
				}
				if(result != null){
					Collections.sort(result, new Comparator<Map<String, Object>>() {
						@Override
						public int compare(Map<String, Object> a, Map<String, Object> b) {
							int result = 0;
							for(Entry<String, String> item:orderFd.entrySet()){
								if(result == 0){
									Object va = a.get(item.getKey());
									Object vb = b.get(item.getKey());
									if("desc".equals(item.getValue())){
										result = (String.valueOf(va).compareTo(String.valueOf(vb))) * -1;
									}else{
										result = String.valueOf(va).compareTo(String.valueOf(vb));
									}
								}
							}
							return result;
						}
					});
				}
			}
			if(queryCriteria.getSelectOne()){
				resultList.add(result.get(0));
			}else{
//				if(result.size() > queryCriteria.getPageSize()){
//					resultList.addAll(result.subList(0, queryCriteria.getPageSize()));
//				}else{
//					resultList.addAll(result.subList(0, result.size()));
//				}
				resultList.addAll(result);
			}
		return resultList;
	}
	
	/**
	 * 分表后执行行更新操作
	 * @param executorService 线程池
	 * @param baseDAL 操作类
	 * @param obj 更新对象
	 * @param queryCriteria 更新条件
	 * @param method 方法类型
	 * @return 更新的条数
	 */
	public static int shardingBatchUpdate(ExecutorService executorService, BaseDAL baseDAL, Object obj, QueryCriteria queryCriteria, Method method){
		int total = 0;
		List<Future<Integer>> list = new ArrayList<Future<Integer>>();
		if(queryCriteria != null){
			List<String> tables = TableShardingRouter.getShardingTables(queryCriteria);
			if(tables != null){
				for(String tb:tables){
					AsynContext asynContext = new AsynContext(method, obj);
	            	asynContext.setTable(tb);
	            	queryCriteria.setTable(tb);
	            	asynContext.setQueryCriteria(queryCriteria);
	            	Callable<Integer> callable = new ShardsBatchCallable(baseDAL, asynContext);
	            	Future<Integer> future = executorService.submit(callable);
	            	list.add(future);
				}
			}
			for(Future<Integer> future:list){
				Integer result = null;
				try {
					result = future.get(CALLABLE_FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
				} catch (TimeoutException | InterruptedException | ExecutionException e) {
					e.printStackTrace();
				}
				if(null != result){
					total += result;
				}
			}
		}
		return total;
	}

	/**
	 * 分表后执行行更新操作(带事务)
	 * @param baseDAL 操作类
	 * @param obj 更新对象
	 * @param queryCriteria 更新条件
	 * @return 更新的条数
	 */
	public static int shardingBatchUpdate(BaseDAL baseDAL, Object obj, QueryCriteria queryCriteria){
		int total = 0;
		if(queryCriteria != null){
			List<String> tables = TableShardingRouter.getShardingTables(queryCriteria);
			if(tables != null){
				for(String tb:tables){
					queryCriteria.setTable(tb);
					int result=baseDAL.updateByCriteria(obj,queryCriteria);
					total += result;
				}
			}
		}
		return total;
	}
	
	
	public static int shardingIndexTableInsert(ExecutorService executorService, BaseDAL baseDAL, Map<String, Object> content, String tableName){
		List<Map<String, Object>> list = new ArrayList<>();
		list.add(content);
		return shardingIndexTableInsert(executorService, baseDAL, list, tableName);
	}
	
	/**
	 * 向分表后插入索引数据
	 * @param executorService 线程池
	 * @param baseDAL 操作类
	 * @param content 数据集
	 * @param tableName 名表
	 * @return 插入的条数
	 */
	public static int shardingIndexTableInsert(ExecutorService executorService, BaseDAL baseDAL, List<Map<String, Object>> content, String tableName){
		int total = 0;
		if(content != null){
			AsynContext asynContext = new AsynContext(Method.INSERT_BATCH, content);
        	asynContext.setTable(tableName + "index");
        	Callable<Integer> callable = new ShardsBatchCallable(baseDAL, asynContext);
        	Future<Integer> future = executorService.submit(callable);
			try {
				total = future.get(CALLABLE_FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
			} catch (TimeoutException | InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}
		return total;
	}
	
	
	/**
	 * 向分表后的多个表中批量插入数据
	 * @param executorService 线程池
	 * @param baseDAL 操作类
	 * @param ids 数据集
	 * @param tableName 名表
	 * @return 插入的条数
	 */
	public static QueryResult shardingSelectByIds(ExecutorService executorService, BaseDAL baseDAL, List<Object> ids, String tableName, List<String> fields){
		QueryResult allQueryResult = new QueryResult();
		List<Future<QueryResult>> list = new ArrayList<Future<QueryResult>>();
		Map<String, List<Long>> shard = new HashMap<>();
		if(ids != null){
            for(int i = 0; i < ids.size(); i++){
        		Object item = ids.get(i);
        		long id = (long) item;
        		String shardingTable = TableShardingRouter.getShardingTableById(tableName, id);
        		if(shard.containsKey(shardingTable)){
        			shard.get(shardingTable).add(id);
        		}else{
        			List<Long> listTemp = new ArrayList<>();
        			listTemp.add(id);
        			shard.put(shardingTable, listTemp);
        		}
            }
			
			for(Entry<String, List<Long>> item : shard.entrySet()){
				AsynContext asynContext = new AsynContext(Method.SELECT_BY_IDS, item.getValue());
            	asynContext.setTable(item.getKey());
            	asynContext.setFields(fields);
            	Callable<QueryResult> callable = new ShardsQueryCallable(baseDAL, asynContext);
            	Future<QueryResult> future = executorService.submit(callable);
            	list.add(future);
	        }
			
			for(Future<QueryResult> future:list){
				QueryResult result = null;
				try {
					result = future.get(CALLABLE_FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
				} catch (TimeoutException | InterruptedException | ExecutionException e) {
					e.printStackTrace();
				}
				if(null != result){
					if(result.getList() != null && result.getList().size() > 0){
						allQueryResult.addShardResult(result.getList());
					}
				}
			}
		}
		return allQueryResult;
	}
	
	/**
	 * 向分表后的多个表中批量插入数据
	 * @param executorService 线程池
	 * @param baseDAL 操作类
	 * @param content 数据集
	 * @param tableName 名表
	 * @return 插入的条数
	 */
	public static int shardingBatchInsert(ExecutorService executorService, BaseDAL baseDAL, List<Map<String, Object>> content, String tableName){
		int total = 0;
		List<Future<Integer>> list = new ArrayList<Future<Integer>>();
		Map<String, List<Map<String, Object>>> shard = new HashMap<>();
		List<Map<String, Object>> listIndex = new ArrayList<>();
		if(content != null){
            for(int i = 0; i < content.size(); i++){
        		Object item = content.get(i);
        		@SuppressWarnings("unchecked")
				Map<String, Object> objMap = (Map<String, Object>) JsonUtils.objToMap(item);
        		long id = TableShardingRouter.generateShardingId(tableName, objMap);
        		objMap.put(BaseDTO.ID, id);
        		listIndex.add(objMap);
        		String shardingTable = TableShardingRouter.getShardingTableById(tableName, id);
        		if(shard.containsKey(shardingTable)){
        			shard.get(shardingTable).add(objMap);
        		}else{
        			List<Map<String, Object>> listTemp = new ArrayList<>();
        			listTemp.add(objMap);
        			shard.put(shardingTable, listTemp);
        		}
            }
			
			for(Entry<String, List<Map<String, Object>>> item : shard.entrySet()){
				AsynContext asynContext = new AsynContext(Method.INSERT_BATCH, item.getValue());
            	asynContext.setTable(item.getKey());
            	Callable<Integer> callable = new ShardsBatchCallable(baseDAL, asynContext);
            	Future<Integer> future = executorService.submit(callable);
            	list.add(future);
	        }
			
			for(Future<Integer> future:list){
				Integer result = null;
				try {
					result = future.get(CALLABLE_FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
				} catch (TimeoutException | InterruptedException | ExecutionException e) {
					e.printStackTrace();
				}
				if(null != result){
					total += result;
				}
			}
			
        	Set<String> fields = TableShardingRouter.getIndexTableField(tableName);
        	if(null != fields && fields.size() > 0){
        		ShardsUtils.shardingIndexTableInsert(executorService, baseDAL, listIndex, tableName);
        	}
			
		}
		return total;
	}
	
	/**
	 * 向分表后的多个表中批量插入数据
	 * @param executorService 线程池
	 * @param baseDAL 操作类
	 * @param queryCriteria 查询条件
	 * @return 插入的条数
	 */
	public static int shardingCount(ExecutorService executorService, BaseDAL baseDAL, QueryCriteria queryCriteria){
		int total = 0;
		List<Future<Integer>> list = new ArrayList<Future<Integer>>();
		if (TableShardingRouter.containsTable(queryCriteria.getTable())) {
			List<String> ranges = TableShardingRouter.getShardingTables(queryCriteria);
			if (ranges != null && ranges.size() > 0) {
				for (int i = 0; i < ranges.size(); i++) {
					QueryCriteria clone = queryCriteria.clone();
					clone.setTable(ranges.get(i));
					AsynContext asynContext = new AsynContext(Method.COUNT_BY_CRITERIA, clone);
					asynContext.setQueryCriteria(clone);
					asynContext.setDatabase(DBContextHolder.getCurrentDataSourceKey());
					Callable<Integer> callable = new ShardsBatchCallable(baseDAL, asynContext);
					Future<Integer> future = executorService.submit(callable);
					list.add(future);
				}
			}
		}
		for(Future<Integer> future:list){
			Integer result = null;
			try {
				result = future.get(CALLABLE_FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
			} catch (TimeoutException | InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
			if(null != result){
				total += result;
			}
		}
		return total;
	}
	
	
	/**
	 * 从多个分表数据中查询数据合并排序不带分页
	 * @param executorService 线程池
	 * @param baseDAL 操作类
	 * @param queryCriteria 查询条件
	 * @param fields 查询字段
	 * @return 查询结果
	 */
	public static QueryResult shardingSelectByCriteria(ExecutorService executorService, BaseDAL baseDAL, QueryCriteria queryCriteria, List<String> fields) {
		QueryResult allQueryResult = new QueryResult();

		if (TableShardingRouter.containsTable(queryCriteria.getTable())) {
			List<String> ranges = TableShardingRouter.getShardingTables(queryCriteria);
			if (ranges != null && ranges.size() > 0) {
				if(ranges.size()==1){
					if(StringUtils.isNotBlank(DBContextHolder.getCurrentDataSourceKey())){
						DBContextHolder.swithTo(DBContextHolder.getCurrentDataSourceKey());
					}
					QueryCriteria clone = queryCriteria.clone();
					clone.setTable(ranges.get(0));
					return baseDAL.selectByCriteria(fields,clone);
				}else{
					List<Future<QueryResult>> list = new ArrayList<Future<QueryResult>>();
					for (int i = 0; i < ranges.size(); i++) {
						QueryCriteria clone = queryCriteria.clone();
						AsynContext asynContext = new AsynContext(Method.SELECT_BY_CRITERIA, clone);
						asynContext.setTable(ranges.get(i));
						asynContext.setFields(fields);
						asynContext.setDatabase(DBContextHolder.getCurrentDataSourceKey());
						Callable<QueryResult> callable = new ShardsQueryCallable(baseDAL, asynContext);
						Future<QueryResult> future = executorService.submit(callable);
						list.add(future);
					}
					for (Future<QueryResult> future : list) {
						QueryResult queryResult = null;
						try {
							queryResult = future.get(CALLABLE_FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
						} catch (TimeoutException | InterruptedException | ExecutionException e) {
							e.printStackTrace();
						}
						if (null != queryResult) {
							if (queryResult.getList() != null && queryResult.getList().size() > 0) {
								allQueryResult.addShardResult(queryResult.getList());
							}
						}
					}
				}
			}
		}
		if (allQueryResult != null && allQueryResult.getList() != null && allQueryResult.getList().size() > 0) {
			List<Map<String, Object>> rtList = ShardsUtils.complieResult(allQueryResult.getList(), queryCriteria);
			allQueryResult.clear();
			allQueryResult.addShardResult(rtList);
			if (queryCriteria.getLimit() > 0 && queryCriteria.getRecordIndex() >= 0) {
				if(rtList.size()>queryCriteria.getRecordIndex()){
					if(rtList.size() <= queryCriteria.getLimit()) {
						allQueryResult.setResultList(rtList.subList(queryCriteria.getRecordIndex(), rtList.size()));
					}else {
						allQueryResult.setResultList(rtList.subList(queryCriteria.getRecordIndex(), (queryCriteria.getRecordIndex()+queryCriteria.getLimit())));
					}
				}
			} else {
				int total = rtList.size();
				if (total > 0) {
					int pageCount = total / queryCriteria.getPageSize();
					if (total % queryCriteria.getPageSize() != 0) {
						pageCount++;
					}
					if (queryCriteria.getPageIndex() > pageCount) {
						queryCriteria.setPageIndex(pageCount);
					}
					int start = (queryCriteria.getPageIndex() - 1) * queryCriteria.getPageSize();
					if (rtList.size() > (start+queryCriteria.getPageSize())) {
						allQueryResult.setResultList(new ArrayList(rtList.subList(start, (start+queryCriteria.getPageSize()))));
					} else {
						allQueryResult.setResultList(new ArrayList(rtList.subList(start, rtList.size())));
					}
				}
			}
		}
		return allQueryResult;

	}

	/**
	 * 从多个分表数据中查询数据合并排序带分页
	 * @param executorService 线程池
	 * @param baseDAL 操作类
	 * @param queryCriteria 查询条件
	 * @param fields 查询字段
	 * @return 查询结果
	 */
	public static QueryResult shardingSelectPageByCriteria(ExecutorService executorService, BaseDAL baseDAL, QueryCriteria queryCriteria, List<String> fields) {
		QueryResult allQueryResult = new QueryResult();

		if (TableShardingRouter.containsTable(queryCriteria.getTable())) {
			List<String> ranges = TableShardingRouter.getShardingTables(queryCriteria);
			if (ranges != null && ranges.size() > 0) {
				if(ranges.size()==1){
					if(StringUtils.isNotBlank(DBContextHolder.getCurrentDataSourceKey())){
						DBContextHolder.swithTo(DBContextHolder.getCurrentDataSourceKey());
					}
					QueryCriteria clone = queryCriteria.clone();
					clone.setTable(ranges.get(0));
					return baseDAL.selectPageByCriteria(fields,clone);
				}else{
					List<Future<QueryResult>> list = new ArrayList<Future<QueryResult>>();
					for (int i = 0; i < ranges.size(); i++) {
						QueryCriteria clone = queryCriteria.clone();
						AsynContext asynContext = new AsynContext(Method.SELECT_BY_CRITERIA, clone);
						asynContext.setTable(ranges.get(i));
						asynContext.setFields(fields);
						asynContext.setDatabase(DBContextHolder.getCurrentDataSourceKey());
						Callable<QueryResult> callable = new ShardsQueryCallable(baseDAL, asynContext);
						Future<QueryResult> future = executorService.submit(callable);
						list.add(future);
					}
					for (Future<QueryResult> future : list) {
						QueryResult queryResult = null;
						try {
							queryResult = future.get(CALLABLE_FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
						} catch (TimeoutException | InterruptedException | ExecutionException e) {
							e.printStackTrace();
						}
						if (null != queryResult) {
							if (queryResult.getList() != null && queryResult.getList().size() > 0) {
								allQueryResult.addShardResult(queryResult.getList());
							}
						}
					}
				}
			}
		}
		if (allQueryResult != null && allQueryResult.getList() != null && allQueryResult.getList().size() > 0) {
			List<Map<String, Object>> rtList = ShardsUtils.complieResult(allQueryResult.getList(), queryCriteria);
			allQueryResult.clear();
			allQueryResult.addShardResult(rtList);
			int total = rtList.size();
			if (total > 0) {
				int pageCount = total / queryCriteria.getPageSize();
				if (total % queryCriteria.getPageSize() != 0) {
					pageCount++;
				}
				if (queryCriteria.getPageIndex() > pageCount) {
					queryCriteria.setPageIndex(pageCount);
				}
				Map<String, Object> page = new HashMap<String, Object>();
				page.put(BaseDAL.PAGE_INDEX_KEY, queryCriteria.getPageIndex());
				page.put(BaseDAL.PAGE_SIZE_KEY, queryCriteria.getPageSize());
				page.put(BaseDAL.PAGE_COUNT_KEY, pageCount);
				page.put(BaseDAL.RECORD_TOTAL_KEY, total);
				allQueryResult.setPage(page);
				int start = (queryCriteria.getPageIndex() - 1) * queryCriteria.getPageSize();
				if (rtList.size() > (start+queryCriteria.getPageSize())) {
					allQueryResult.setResultList(new ArrayList(rtList.subList(start, (start+queryCriteria.getPageSize()))));
				} else {
					allQueryResult.setResultList(new ArrayList(rtList.subList(start, rtList.size())));
				}
			}
		}
		return allQueryResult;

	}

	/**
	 * 从多个分表数据中查询数据合并排序带分页(带事务)
	 * @param baseDAL 操作类
	 * @param queryCriteria 查询条件
	 * @param fields 查询字段
	 * @return 查询结果
	 */
	public static QueryResult shardingSelectByCriteria(BaseDAL baseDAL, QueryCriteria queryCriteria, List<String> fields) {
		QueryResult allQueryResult = new QueryResult();
		if (TableShardingRouter.containsTable(queryCriteria.getTable())) {
			List<String> ranges = TableShardingRouter.getShardingTables(queryCriteria);
			if (ranges != null && ranges.size() > 0) {
				for (int i = 0; i < ranges.size(); i++) {
					QueryCriteria clone = queryCriteria.clone();
					clone.setTable(ranges.get(i));
					QueryResult queryResult = baseDAL.selectByCriteria(fields, clone);
					if (null != queryResult) {
						if (queryResult.getList() != null && queryResult.getList().size() > 0) {
							allQueryResult.addShardResult(queryResult.getList());
						}
					}
				}
			}
		}
		if (allQueryResult != null && allQueryResult.getList() != null && allQueryResult.getList().size() > 0) {
			List<Map<String, Object>> rtList = ShardsUtils.complieResult(allQueryResult.getList(), queryCriteria);
			allQueryResult.clear();
			allQueryResult.addShardResult(rtList);
			if (queryCriteria.getLimit() > 0 && queryCriteria.getRecordIndex() >= 0) {
				if(rtList.size()>queryCriteria.getRecordIndex()){
					if(rtList.size() <= queryCriteria.getLimit()) {
						allQueryResult.setResultList(rtList.subList(queryCriteria.getRecordIndex(), rtList.size()));
					}else {
						allQueryResult.setResultList(rtList.subList(queryCriteria.getRecordIndex(), (queryCriteria.getRecordIndex()+queryCriteria.getLimit())));
					}
				}
			} else {
				int total = rtList.size();
				if (total > 0) {
					int pageCount = total / queryCriteria.getPageSize();
					if (total % queryCriteria.getPageSize() != 0) {
						pageCount++;
					}
					if (queryCriteria.getPageIndex() > pageCount) {
						queryCriteria.setPageIndex(pageCount);
					}
					Map<String, Object> page = new HashMap<String, Object>();
					page.put(BaseDAL.PAGE_INDEX_KEY, queryCriteria.getPageIndex());
					page.put(BaseDAL.PAGE_SIZE_KEY, queryCriteria.getPageSize());
					page.put(BaseDAL.PAGE_COUNT_KEY, pageCount);
					page.put(BaseDAL.RECORD_TOTAL_KEY, total);
					allQueryResult.setPage(page);
					int start = (queryCriteria.getPageIndex() - 1) * queryCriteria.getPageSize();
					if (rtList.size() > (start+queryCriteria.getPageSize())) {
						allQueryResult.setResultList(new ArrayList(rtList.subList(start, (start+queryCriteria.getPageSize()))));
					} else {
						allQueryResult.setResultList(new ArrayList(rtList.subList(start, rtList.size())));
					}
				}
			}
		}
		return allQueryResult;

	}
	
	public static Map<String, Object> buildIndexTableRecord(Map<String, Object> content, String tableName){
		List<Map<String, Object>> ctt = new ArrayList<>();
		if(content != null){
			ctt.add(content);
			List<Map<String, Object>> result = buildIndexTableRecord(ctt, tableName);
			if(result != null){
				return result.get(0);
			}
		}
		return null;
	}
	
	private static List<Map<String, Object>> buildIndexTableRecord(List<Map<String, Object>> content, String tableName){
		List<Map<String, Object>> result = new ArrayList<>();
		if(content != null){
			Set<String> fields = TableShardingRouter.getIndexTableField(tableName);
			if(null != fields && fields.size() > 0){
				for(Map<String, Object> obj:content){
					Map<String, Object> item = new HashMap<>();
					item.put(BaseDTO.ID, obj.get(BaseDTO.ID));
					for(String fd:fields){
						if(obj.containsKey(fd)){
							item.put(fd, obj.get(fd));
						}
					}
					result.add(item);
				}
			}
		}
		return result;
	}
	
	public static Object getPrimaryKeyFromContent(Map<String, Object> content) {
    	Object id = null;
    	for(String key:content.keySet()) {
    		if(BaseDTO.ID.toLowerCase().equals(key.toLowerCase().trim())) {
    			id = content.get(key);
    			break;
    		}
    	}
    	return id;
    }

}
